package com.xiaofan.apitest.transform

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.scala._

object TransformTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )

    /*
      min: 取相同key的第一条元素的值
      minBy: 取相同key的最小的元素的所有信息
      reduce：自定义聚合函数，取最小的温度，同时取最新的时间
     */

    // 1. 分组聚合，输出每个传感器当前的最小值，时间戳是这条数据对应的时间
    val aggStream: DataStream[SensorReading] = dataStream
      .keyBy(_.id)
      .minBy("temperature")

    // 2. 需要输出当前最小的温度值，以及最近的时间戳，要用reduce
    val resultStream: DataStream[SensorReading] = dataStream
      .keyBy(_.id)
      .reduce(
        (curState, newData) => {
          SensorReading(curState.id, newData.timestamp, curState.temperature.min(newData.temperature))
        }
      )

    // 3. 多流转换操作
    // 3.1. 分流：将传感器温度数据分成低温、高温两条流
    val splitStream: SplitStream[SensorReading] = dataStream.split(
      data => {
        if (data.temperature > 30.0) Seq("high") else Seq("low")
      }
    )
    val highStream: DataStream[SensorReading] = splitStream.select("high")
    val lowStream: DataStream[SensorReading] = splitStream.select("low")

    // 3.2. 合流 connect
    val warningStream: DataStream[(String, Double)] = highStream.map { data => (data.id, data.temperature) }
    val connectedStream: ConnectedStreams[(String, Double), SensorReading] = warningStream.connect(lowStream)

    // 用coMap对数据进行分别处理
    val coMapResultStream: DataStream[Product] = connectedStream.map(
      warningData => (warningData._1, warningData._2, "warning!"),
      lowTempData => (lowTempData.id, "healthy~")
    )

    coMapResultStream.print("coMapResultStream")

    // 3.3. union 合流（注意和connect流的区别）
    val unionStream: DataStream[SensorReading] = highStream.union(lowStream)

    env.execute("transform test")

  }
}
